import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.BatchTableEnvironment;
import org.apache.flink.table.functions.ScalarFunction;
import org.apache.flink.types.Row;

import static org.apache.flink.table.api.Expressions.call;

import java.util.Arrays;

import static org.apache.flink.table.api.Expressions.$;

public class Map {



    public static void main(String[] args) throws Exception
    {

        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        BatchTableEnvironment tEnv = BatchTableEnvironment.create(env);

        ScalarFunction func = new MyMapFunction();
        tEnv.registerFunction("func", func);


        DataSet<Order> ds1 = env.fromCollection(Arrays.asList(
                new Order(1L, "beer", 3),
                new Order(3L, "rubber", 2),
                new Order(1L, "diaper", 4)
        ));
        Table input = tEnv.fromDataSet(ds1,$("user"),$("product"),$("amount"));

        input.map(call("func","product")).as("a", "b");
        tEnv.toDataSet(input, Row.class).print();

    }
}
